{
  "metadata" : {
    "name" : "Simple ML in Spark",
    "user_save_timestamp" : "1970-01-01T00:00:00.000Z",
    "auto_save_timestamp" : "1970-01-01T00:00:00.000Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "customLocalRepo" : "/root/.ivy2",
    "customRepos" : null,
    "customDeps" : null,
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : {
      "spark.executor.memory" : "1024m",
      "spark.cassandra.connection.host" : "127.0.0.1",
      "spark.cores.max" : "2",
      "spark.executor.cores" : "2",
      "spark.master" : "local[*]"
    }
  },
  "cells" : [ {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Data types for MLLib: Breeze Vectors"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.mllib.linalg.{Vector, Vectors}",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val nbValues = 3\nval indexes = Array(0, 2)\nval values = Array(1.0, 3.0)\nval sv1: Vector = Vectors.sparse(nbValues, indexes, values)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "sv1.toDense",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### LabeledPoints"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.mllib.regression.LabeledPoint",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Create RDD of LabeledPoints from a Dataframe"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : ":sh wget https://s3-eu-west-1.amazonaws.com/spark-notebook-data/closes.csv -O /tmp/closes.csv",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val c = sparkContext.textFile(\"/tmp/closes.csv\")",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.sql.SQLContext\nval sqlContext = new SQLContext(sparkContext)\nimport sqlContext.implicits._ ",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "case class Quote(stock:String, date:Long, price:Double) extends java.io.Serializable\nobject Quote {\n  val df = new java.text.SimpleDateFormat(\"yyyy-MM-dd\")\n  def parse(d: String): Long = df.parse(d).getTime\n}",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val q  = c.map(_.split(\",\").toList)\n           .map{ \n             case List(s, d, p) => Quote(s, Quote.parse(d), p.toDouble)\n           }\n           .toDF\n()",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "q.cache()\n()",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.sql.functions._\nval maxDate = q.agg(max(\"date\")).map(_.getAs[Long](0)).first",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val stocks = q.filter($\"date\" === maxDate && $\"price\" > 20).select(\"stock\").distinct.sample(false, 0.05, 124L)\n              .map(_.getAs[String](0))\n              .collect\n              .toSet                                                                                                       + \"MCHB\" // ok ok we're owned! :-D   \nval nb = stocks.size\nval broadStocks = sparkContext.broadcast(stocks)\n()",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val strip = q.filter($\"stock\" isin (broadStocks.value.toList:_*)).cache()\n()",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val dates = strip.groupBy($\"date\").count.filter($\"count\" === nb)\n()",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val cleanDF = strip.join(dates, dates(\"date\") === q(\"date\")).select($\"stock\", dates(\"date\"), $\"price\")\n()",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val takeOutMCHB_DF = cleanDF.withColumn(\"label\", when($\"stock\" startsWith \"MCHB\", true).otherwise(false)).cache()\n()",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "LineChart(\n  takeOutMCHB_DF.filter($\"stock\" startsWith \"VIDI\").orderBy(\"date\").map(r => r.getAs[Long](1) → r.getAs[Double](2)).collect,\n  maxPoints=90\n)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val takeOutMCHB = \n  takeOutMCHB_DF.rdd.map { row => \n    (row.getAs[String](\"stock\"), row.getAs[Long](\"date\"), row.getAs[Double](\"price\"), row.getAs[Boolean](\"label\"))\n  }",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val byDateCloses = takeOutMCHB.groupBy( elt => elt._2 )\n                                .mapValues{ it => \n                                  val arr = it.toArray\n                                  val label = arr.find(_._4).map(_._3).get\n                                  val features = arr.filter(!_._4).sortBy(_._1).map(_._3)\n                                  label +: features\n                                }\n                                .values",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val points = byDateCloses.map{ arr => \n                            LabeledPoint(arr(0), Vectors.dense(1d /*add intercept*/ +: arr.drop(1)))\n                          }",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "# Method"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Split the data into training and test sets (30% held out for testing)"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.mllib.optimization.{LBFGS, LeastSquaresGradient, SquaredL2Updater}\nimport org.apache.spark.mllib.regression.{LinearRegressionModel}",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val seed = 42L\nval Array(trainingData, validationData) = points.randomSplit(Array(0.7, 0.3), seed)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "def createModel(λ:Double = 0.01, trainingData:RDD[LabeledPoint]) = {\n  val one = trainingData.first\n  val numCorrections = 10\n  val convergenceTol = 1e-4\n  val maxNumIterations = 100\n  val regParam = λ\n  val initialWeightsWithIntercept = Vectors.dense(new Array[Double](one.features.size))\n\n  val (weightsWithIntercept, loss) = LBFGS.runLBFGS(\n    trainingData.map(lp => (lp.label, lp.features)), // create the RDD[(Double, Vector)]\n    new LeastSquaresGradient(),                      // loss function\n    new SquaredL2Updater(),\n    numCorrections,\n    convergenceTol,\n    maxNumIterations,\n    regParam,\n    initialWeightsWithIntercept\n  )\n\n  val model = new LinearRegressionModel(weightsWithIntercept, 0.0)\n  model\n}",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.sql.functions._\ndef predict(model:LinearRegressionModel, validationData:RDD[LabeledPoint]) = {\n  \n  val labelsAndPredictions = validationData.map { point =>\n    val prediction = model.predict(point.features)\n    (point.label, prediction)\n  }.toDF(\"point\", \"prediction\")\n  \n  val withSe = labelsAndPredictions.withColumn(\"se\", pow($\"point\"-$\"prediction\", 2))\n  \n  val testSqrtMSE = scala.math.sqrt(withSe.agg(mean(\"se\")).first.getAs[Double](0))\n  \n  (model, testSqrtMSE)\n}",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "def run(λ:Double = 0.01, \n        trainingData:RDD[LabeledPoint], \n        validationData:RDD[LabeledPoint],\n        plot:Option[Chart[Seq[(Double, Double)]]]=None) = {\n  val (model, r) = predict(createModel(λ, trainingData), validationData)\n  plot.foreach(_.addAndApply(Seq((λ, r))))\n  (model, r)\n}",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "sparkContext.setCheckpointDir(\"/tmp/chckdir\")\nval tr = trainingData.coalesce(24, true).cache()\ntr.checkpoint()\nval vd = validationData.coalesce(24, true).cache()\nvd.checkpoint()\ntr.count\nvd.count",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Choose arbitrary $\\lambda = 0.01$"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val (model, r) = run(0.01, tr, vd)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val withoutReg = run(0, tr, vd)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Let's try many values of this hyper parameter."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val plotRuns = LineChart(Seq((0d, withoutReg._2)), maxPoints=1000)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val testBed = (1d to 20d by 1d) map {i => 10e-8 * math.pow(10, i/2)}\nval runs = testBed.map(λ => (λ, run(λ, tr, vd, plot=Some(plotRuns))))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val bestLambda = runs.minBy(_._2._2)._1\nval bestModel = runs.minBy(_._2._2)._2._1\nval bestPerf = runs.minBy(_._2._2)._2._2",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "However, we cannot validate the model against new data. So we only know that this model is best for the given data.\n\nIn other words, we have no clue if it over or under fits new data, under the condition that the variability of the universe is not fully captured in the validation test.\n\n> This assumption is viable because the validation test is only (generaly) 30 percent of the known data ; which is itself probably a small fraction of the universe.\n\n"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Solution: cross-validation!"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "## Cross Validation"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "The known data is split in two sets:\n* prediction set (`10%`)\n* learning set (`90%`)\n\nThe learning set is also split in K-Folds."
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "The validation set will help us to choose the best value for our hyper parameter $\\lambda$.\n\nIt is crucial to know this hyper parameter as best as possible because it regularizes how the model is built in the family."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val (predictionSet, learningData) = {\n  val Array(predictionSet, learningData) = points.randomSplit(Array(0.1, 0.9), 555L)\n  val p = predictionSet.coalesce(24, true).cache()\n  p.checkpoint()\n  p.count\n  val l = learningData.coalesce(24, true).cache()\n  l.checkpoint()\n  l.count\n  (p, l)\n}",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val (fold1, fold2, fold3, fold4) = {\n  val Array(fold1, fold2, fold3, fold4) = learningData.randomSplit(Array(0.25, 0.25, 0.25, 0.25), 94564L)\n  \n  val f1 = fold1.coalesce(24, true).cache()\n  f1.checkpoint()\n  f1.count\n  val f2 = fold2.coalesce(24, true).cache()\n  f2.checkpoint()\n  f2.count\n  val f3 = fold3.coalesce(24, true).cache()\n  f3.checkpoint()\n  f3.count\n  val f4 = fold4.coalesce(24, true).cache()\n  f4.checkpoint()\n  f4.count\n  (f1, f2, f3, f4)\n}",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val cv1 = (sparkContext.union(fold1, fold2, fold3), fold4)\nval cv2 = (sparkContext.union(fold2, fold3, fold4), fold1)\nval cv3 = (sparkContext.union(fold3, fold4, fold1), fold2)\nval cv4 = (sparkContext.union(fold4, fold1, fold2), fold3)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val plotCv1 = LineChart(Seq((0d, 0d)), maxPoints=1000, sizes=(300,300))\nval plotCv2 = LineChart(Seq((0d, 0d)), maxPoints=1000, sizes=(300,300))\nval plotCv3 = LineChart(Seq((0d, 0d)), maxPoints=1000, sizes=(300,300))\nval plotCv4 = LineChart(Seq((0d, 0d)), maxPoints=1000, sizes=(300,300))\nval allPlot = LineChart(Seq((0d, 0d)), maxPoints=1000, sizes=(600,450))\ncontainerFluid(List(\n  List(plotCv1 → 6, plotCv2 → 6), \n  List(plotCv3 → 6, plotCv4 → 6)\n)) ++ allPlot",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val cvs = testBed.drop(5).map { λ => \n  val onCv1 = run(λ, cv1._1, cv1._2, plot=Some(plotCv1))._2\n  val onCv2 = run(λ, cv2._1, cv2._2, plot=Some(plotCv2))._2\n  val onCv3 = run(λ, cv3._1, cv3._2, plot=Some(plotCv3))._2\n  val onCv4 = run(λ, cv4._1, cv4._2, plot=Some(plotCv4))._2\n  val result = (λ, onCv1+onCv2+onCv3+onCv4)\n  allPlot.addAndApply(Seq(result))\n  result\n}\n()",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "val bestLambdaCV = cvs.minBy(_._2)._1",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "val modelOnInitialTrainingData = createModel(bestLambdaCV, tr)\nval (perfOnInitialValidationData, _r) = predict(modelOnInitialTrainingData, vd)\n_r",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Create model on full learning dataset"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "val bestModelCV = createModel(bestLambdaCV, learningData)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Estimate error on test data."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "val cv_r = predict(bestModelCV, predictionSet)._2",
    "outputs" : [ ]
  } ],
  "nbformat" : 4
}